/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.dq.data.service.impl.function.data;

import static org.unidata.mdm.dq.core.type.constant.CleanseConstants.OUTPUT_PORT_1;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.unidata.mdm.core.type.data.Attribute;
import org.unidata.mdm.core.type.data.Attribute.AttributeType;
import org.unidata.mdm.core.type.data.CodeAttribute;
import org.unidata.mdm.core.type.data.CodeAttribute.CodeDataType;
import org.unidata.mdm.core.type.data.SimpleAttribute;
import org.unidata.mdm.core.type.data.SimpleAttribute.SimpleDataType;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.service.impl.function.system.AbstractSystemCleanseFunction;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionConfiguration;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionExecutionScope;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionOutputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortFilteringMode;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortInputType;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType;
import org.unidata.mdm.dq.core.type.constant.CleanseConstants;
import org.unidata.mdm.dq.core.type.io.DataQualityError;
import org.unidata.mdm.dq.core.type.io.DataQualitySpot;
import org.unidata.mdm.dq.core.type.rule.SeverityIndicator;
import org.unidata.mdm.dq.core.util.DQUtils;
import org.unidata.mdm.meta.type.search.EntityIndexType;
import org.unidata.mdm.meta.type.search.RecordHeaderField;
import org.unidata.mdm.search.context.SearchRequestContext;
import org.unidata.mdm.search.dto.SearchResultDTO;
import org.unidata.mdm.search.service.SearchService;
import org.unidata.mdm.search.type.FieldType;
import org.unidata.mdm.search.type.form.FieldsGroup;
import org.unidata.mdm.search.type.form.FormField;
import org.unidata.mdm.search.type.query.SearchQuery;
import org.unidata.mdm.system.util.TextUtils;

/**
 * @author Ruslan Trachuk
 */
public class CheckRecordAttributesUnique extends AbstractSystemCleanseFunction {
    /**
     * Display name code.
     */
    private static final String FUNCTION_DISPLAY_NAME = "app.dq.functions.data.check.attributes.unique.display.name";
    /**
     * Description code.
     */
    private static final String FUNCTION_DESCRIPTION = "app.dq.functions.data.avoid.duplicates.decsription";
    /**
     * Search_by_all_periods parameter.
     */
    private static final String INPUT_PORT_ALL_PERIODS = "all-periods";
    /**
     * IS_SEARCH_BY_ALL_PERIODS PARAMETER.
     */
    private static final String INPUT_PORT_ALL_PERIODS_NAME = "app.dq.functions.data.check.attributes.unique.all-periods.name";
    /**
     * IS_SEARCH_BY_ALL_PERIODS PARAMETER.
     */
    private static final String INPUT_PORT_ALL_PERIODS_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.all-periods.description";
    /**
     * IP1 name code.
     */
    private static final String INPUT_PORT_1_NAME = "app.dq.functions.data.check.attributes.unique.input.port1.name";
    /**
     * IP1 description code.
     */
    private static final String INPUT_PORT_1_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port1.decsription";
    /**
     * IP2 name code.
     */
    private static final String INPUT_PORT_2_NAME = "app.dq.functions.data.check.attributes.unique.input.port2.name";
    /**
     * IP2 description code.
     */
    private static final String INPUT_PORT_2_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port2.decsription";
    /**
     * IP3 name code.
     */
    private static final String INPUT_PORT_3_NAME = "app.dq.functions.data.check.attributes.unique.input.port3.name";
    /**
     * IP3 description code.
     */
    private static final String INPUT_PORT_3_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port3.decsription";
    /**
     * IP4 name code.
     */
    private static final String INPUT_PORT_4_NAME = "app.dq.functions.data.check.attributes.unique.input.port4.name";
    /**
     * IP4 description code.
     */
    private static final String INPUT_PORT_4_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port4.decsription";
    /**
     * IP5 name code.
     */
    private static final String INPUT_PORT_5_NAME = "app.dq.functions.data.check.attributes.unique.input.port5.name";
    /**
     * IP5 description code.
     */
    private static final String INPUT_PORT_5_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port5.decsription";
    /**
     * IP6 name code.
     */
    private static final String INPUT_PORT_6_NAME = "app.dq.functions.data.check.attributes.unique.input.port6.name";
    /**
     * IP6 description code.
     */
    private static final String INPUT_PORT_6_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port6.decsription";
    /**
     * IP7 name code.
     */
    private static final String INPUT_PORT_7_NAME = "app.dq.functions.data.check.attributes.unique.input.port7.name";
    /**
     * IP7 description code.
     */
    private static final String INPUT_PORT_7_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port7.decsription";
    /**
     * IP8 name code.
     */
    private static final String INPUT_PORT_8_NAME = "app.dq.functions.data.check.attributes.unique.input.port8.name";
    /**
     * IP8 description code.
     */
    private static final String INPUT_PORT_8_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.input.port8.decsription";
    /**
     * OP1 name code.
     */
    private static final String OUTPUT_PORT_1_NAME = "app.dq.functions.data.check.attributes.unique.output.port1.name";
    /**
     * OP1 description code.
     */
    private static final String OUTPUT_PORT_1_DESCRIPTION = "app.dq.functions.data.check.attributes.unique.output.port1.decsription";
    /**
     * This function configuration.
     */
    private static final CleanseFunctionConfiguration CONFIGURATION
        = CleanseFunctionConfiguration.configuration()
            .supports(CleanseFunctionExecutionScope.GLOBAL, CleanseFunctionExecutionScope.LOCAL)
            .input(CleanseFunctionConfiguration.port()
                    .name(INPUT_PORT_ALL_PERIODS)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_ALL_PERIODS_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_ALL_PERIODS_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.BOOLEAN)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_2)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_2_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_2_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_3)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_3_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_3_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_4)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_4_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_4_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_5)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_5_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_5_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_6)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_6_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_6_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.BOOLEAN)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_7)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_7_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_7_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_8)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_8_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_8_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(OUTPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(OUTPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(OUTPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.BOOLEAN)
                    .required(true)
                    .build())
            .build();
    /**
     * Search service.
     */
    @Autowired
    private SearchService searchService;
    /**
     * The data ports.
     */
    private static final String[] DATA_PORTS = {
            CleanseConstants.INPUT_PORT_1,
            CleanseConstants.INPUT_PORT_2,
            CleanseConstants.INPUT_PORT_3,
            CleanseConstants.INPUT_PORT_4,
            CleanseConstants.INPUT_PORT_5,
            CleanseConstants.INPUT_PORT_6,
            CleanseConstants.INPUT_PORT_7,
            CleanseConstants.INPUT_PORT_8
    };
    /**
     * Instantiates a new cleanse function abstract.
     *
     * @param clazz cleanse function class.
     */
    public CheckRecordAttributesUnique() {
        super("CheckRecordAttributesUnique", () -> TextUtils.getText(FUNCTION_DISPLAY_NAME), () -> TextUtils.getText(FUNCTION_DESCRIPTION));
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionConfiguration configure() {
        return CONFIGURATION;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext ctx) {

        // Output
        CleanseFunctionResult result = new CleanseFunctionResult();

        RecordKeys keys = null;
        if (Objects.nonNull(ctx.getPayload()) && ctx.getPayload() instanceof UpsertRequestContext) {
            keys = ((UpsertRequestContext) ctx.getPayload()).keys();
        }

        // Problems with input
        if (Objects.isNull(keys)) {

            result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, Boolean.FALSE));
            result.addError(DataQualityError.builder()
                    .ruleName(ctx.getRuleName())
                    .functionName(getName())
                    .category(DQUtils.CATEGORY_SYSTEM)
                    .message(TextUtils.getText("app.dq.functions.data.check.attributes.unique.payload.missing", ctx.getRuleName(), getName()))
                    .severity(SeverityIndicator.RED)
                    .build());

            return result;
        }

        // Collect values.
        List<FormField> searchFields = new ArrayList<>();
        Collection<Attribute> selected = new ArrayList<>(5);
        for (int i = 0; i < DATA_PORTS.length; i++) {

            CleanseFunctionInputParam param = ctx.getInputParam(DATA_PORTS[i]);
            if (Objects.nonNull(param) && !param.isEmpty() && param.isSingleton()) {

                Attribute attr = param.getSingleton();
                FieldType type = extractDataType(attr);
                Object val = extractDataValue(attr);
                if (type == null || val == null || (type == FieldType.STRING && StringUtils.isBlank((String) val))) {
                    continue;
                }

                selected.add(attr);
                searchFields.add(FormField.exact(type, attr.getName(), false, val));
            }
        }

        boolean isDuplicate = false;
        if (!searchFields.isEmpty()) {

            FieldsGroup searchFieldsGroup = FieldsGroup.and(searchFields);
            FieldsGroup specifiedAndGroup = specifiedAndGroupFields(ctx, ((UpsertRequestContext) ctx.getPayload()), keys);
            SearchRequestContext sCtx = SearchRequestContext.builder(EntityIndexType.RECORD, keys.getEntityName())
                    .filter(SearchQuery.formQuery(searchFieldsGroup, specifiedAndGroup))
                    .totalCount(true)
                    .countOnly(true)
                    .build();

            SearchResultDTO searchResult = searchService.search(sCtx);
            isDuplicate = searchResult.getTotalCount() != 0;
        }

        result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, !isDuplicate));
        result.addSpots(selected.stream()
                .map(attr -> new DataQualitySpot(attr.toLocalPath(), attr))
                .collect(Collectors.toList()));

        return result;
    }
    /**
     * Extracts SDT of the attribute
     * @param attr the attribute
     * @return SDT or null
     */
    private FieldType extractDataType(Attribute attr) {

        if (Objects.nonNull(attr)) {
            if (attr.getAttributeType() == AttributeType.CODE) {
                return ((CodeAttribute<?>) attr).getDataType().toSearchType();
            } else if (attr.getAttributeType() == AttributeType.SIMPLE) {
                return ((SimpleAttribute<?>) attr).getDataType().toSearchType();
            }
        }

        return null;
    }

    /**
     * Extracts value of the attribute
     * @param attr the attribute
     * @return value or null
     */
    private Object extractDataValue(Attribute attr) {

        if (Objects.nonNull(attr)) {
            if (attr.getAttributeType() == AttributeType.CODE) {
                CodeAttribute<?> cast = (CodeAttribute<?>) attr;
                return cast.getDataType() == CodeDataType.STRING && cast.getValue() == null
                        ? StringUtils.EMPTY
                        : cast.getValue();
            } else if (attr.getAttributeType() == AttributeType.SIMPLE) {
                SimpleAttribute<?> cast = (SimpleAttribute<?>) attr;
                return cast.getDataType() == SimpleDataType.STRING && cast.getValue() == null
                        ? StringUtils.EMPTY
                        : cast.getValue();
            }
        }

        return null;
    }
    /**
     * Creates AND field group.
     * @param cfc the CF context
     * @return group
     */
    @SuppressWarnings("unchecked")
    private FieldsGroup specifiedAndGroupFields(CleanseFunctionContext cfc, UpsertRequestContext ctx, RecordKeys keys) {

        Collection<FormField> filter = new ArrayList<>();
        filter.add(FormField.exact(RecordHeaderField.FIELD_DELETED, Boolean.FALSE));
        filter.add(FormField.exact(RecordHeaderField.FIELD_INACTIVE, Boolean.FALSE));

        // skip self for existing records
        if (!keys.isNew()) {
            filter.add(FormField.exactNegate(RecordHeaderField.FIELD_ETALON_ID, keys.getEtalonKey().getId()));
        }

        CleanseFunctionInputParam periods = cfc.getInputParam(INPUT_PORT_ALL_PERIODS);
        if (BooleanUtils.isFalse(
                Objects.nonNull(periods)
                && !periods.isEmpty()
                && ((SimpleAttribute<Boolean>) periods.getSingleton()).getValue())) {

            if (Objects.nonNull(ctx.getValidTo())) {
                filter.add(FormField.range(RecordHeaderField.FIELD_FROM, null, ctx.getValidTo()));
            }

            if (Objects.nonNull(ctx.getValidFrom())) {
                filter.add(FormField.range(RecordHeaderField.FIELD_TO, ctx.getValidFrom(), null));
            }
        }

        return FieldsGroup.and(filter);
    }
}
